Scaling Python

Python in HPC

Saudi Arabian High Performance Computing Users' Group Conference 2012

King Abdullah University of Science and Technology

Aron Ahmadia, PhD
Chief Technology Officer
RunMyCode.org


About This Tutorial

PyHPC Tutorial on GitHub

These presentation materials are part of a continuously updated tutorial on Python for High Performance Computing. To get the presentation materials as they were delivered today, please use the SC2012 tag on github. Future versions of this presentation will be found at:

https://github.com/pyhpc/pyhpc-tutorial

Please set all permanent bookmarks to this URL.

Direct download of this SC2012 presentation

You can download a zip or tar ball from the github SC2012 tag:

wget --no-check-certificate https://github.com/pyhpc/pyhpc-tutorial/zipball/SC2012
wget --no-check-certificate https://github.com/pyhpc/pyhpc-tutorial/tarball/SC2012

2) Checkout from git

git clone https://github.com/pyhpc/pyhpc-tutorial.git

Viewing the read-only version of this presentation on nbviewer:


Interacting with the Tutorial Slides

This tutorial is an interactive worksheet designed to encourage you to try out the lessons during the demonstration. If you are looking at the PDF version of these slides, we encourage you to download the updated version (see previous slide) and try the interactive version.

To run the interactive version of this notebook, you will need a Python 2.7 environment including:

  • IPython version >= 13.0
  • numpy version >= 1.6
  • scipy >= 0.10
  • matplotlib >= 1.0.0
  • mpi4py >= 1.2

Move to the directory containing the tarball and execute:

ipcluster start --engines=MPI --n 4
ipython notebook --pylab=inline

If you are installing the packages yourself, Continuum Analytics (disclaimer, Travis Oliphant is CEO of this company) provides both free community as well as professional versions of the Anaconda installer, which provides all the packages you will need for this portion of the tutorial. The installer is available from the Anaconda download page at Continuum Analytics.

You are also welcome to use Enthought's Python Distribution (free for Academic users), available from the EPD download page at Enthought.

Unfortunately, you will need to upgrade your EPD's IPython to 0.13 if you go this route, and this may be non-trivial, please see the Enthought discussion thread here.


Presentation mode

The slideshow mode is only supported by an IPython development branch version. If you would like to write your own slideshows (the notebooks will work regardless of slideshow capabilities), you will need to install Matthias Bussonnier's slideshow_metadata branch. Here are some sample command-line instructions.

#!/bin/bash
git clone carreau git://github.com/Carreau/ipython.git # Matthias' IPython fork
git checkout origin/slideshow_metadata                 # Select the right branch
python setup.py develop                                # Install the development version
ipython notebook                                       # Check out the slideshows!

Slideshow Setup Code


In [1]:
# The below command starts pylab inline mode, which simplifies syntax for many common 
# scientific computing commands.  All commands starting with % are IPython extensions.
%pylab inline

# The following two lines of code load the slideshow extension and start it,
# you only need to run these if you are writing or running your own slideshow

%load_ext slidemode
%slideshow_mode


Welcome to pylab, a matplotlib-based Python environment [backend: module://IPython.zmq.pylab.backend_inline].
For more information, type 'help(pylab)'.

Acknowledgements


Connecting to MPI via IPython

To run the examples in parallel, you must run the command:

ipcluster start --engines=MPI --n 4

Or use notebook Appendix_03_Launch_MPI_Engines (if you are using the VMs or have this configured for your environment).

Then execute the next cell:


In [2]:
from IPython.parallel import Client
c = Client()
view = c[:]

%load_ext parallelmagic
view.activate()
view.block = True

A Quick Review of Concepts of Scalability


The Multiple Forms of Parallelism

  • instruction - multiple program instructions are simultaneously dispatched in a pipeline or to multiple execution units (superscalar)
  • data - the same program instructions are carried out simultaneously on multiple data items (SIMD)
  • task - different program instructions on different data (MIMD)
  • collective - single program, multiple data, not necessarily synchronized at individual operation level (SPMD)

This part of the tutorial focuses on data and collective parallelism


Parallel Architectures

  • All modern supercomputing architectures are composed of distributed memory compute nodes connected over toroidal networks

    • close mapping to physically n-dimensional problems
    • efficient algorithms exist for local point-to-point and collective communications across toroids
  • We expect to see higher dimensional interconnects and power-efficient networks that consume less power when not sending traffic


Parallel Programming Paradigms

  • a parallel programming paradigm is a specific approach to exploiting parallelism in hardware
  • many programming paradigms are very tightly coupled to the hardware beneath!

    • CUDA assumes large register files, Same Instruction Multiple Thread parallelism, and a mostly flat, structured memory model, matching the underlying GPU hardware

    • OpenMP exposes loop level parallelism with a fork/join model, assumes the presence of shared memory and atomics

    • OpenCL tries to generalize CUDA, but still assumes a 'coprocessor' approach, where kernels are shipped from a master core to worker cores


The Message Passing Model

  • a process is (traditionally) a program counter for instructions and an address space for data
  • processes may have multiple threads (program counters and associated stacks) sharing a single address space
  • message passing is for communication among processes, which have separate address spaces
  • interprocess communication consists of

    • synchronization
    • movement of data from one process's address space to another's

Why MPI?

  • communicators encapsulate communication spaces for library safety
  • datatypes reduce copying costs and permit heterogeneity
  • multiple communication modes allow more control of memory buffer management
  • extensive collective operations for scalable global communication
  • process topologies permit efficient process placement, user views of process layout
  • profiling interface encourages portable tools

It Scales!


Python + MPI Does It Scale?


The Scalability of Python + MPI


MPI - Quick Review

  • processes can be collected into groups
  • each message is sent in a context, and must be received in the same context
  • a communicator encapsulates a context for a specific group
  • a given program may have many communicators with any level of overlap
  • two initial communicators

    • MPI_COMM_WORLD (all processes)
    • MPI_COMM_SELF (current process)

First Example: Hello World

For interactive convenience, we load the parallel magic extensions and make this view the active one for the automatic parallelism magics.

This is not necessary and in production codes likely won't be used, as the engines will load their own MPI codes separately. But it makes it easy to illustrate everything from within a single notebook here.


In [3]:
from IPython.parallel import Client
c = Client()
view = c[:]

%load_ext parallelmagic
view.activate()
view.block = True

Use the autopx magic to make the rest of this cell execute on the engines instead of locally


In [4]:
%autopx


%autopx enabled

With autopx enabled, the next cell will actually execute entirely on each engine:


In [5]:
from mpi4py import MPI

size = MPI.COMM_WORLD.Get_size()
rank = MPI.COMM_WORLD.Get_rank()
name = MPI.Get_processor_name()

print("Hello World! I am process %d of %d on %s.\n" % (rank, size, name))


[stdout:0] 
Hello World! I am process 2 of 4 on pyhpc01.

[stdout:1] 
Hello World! I am process 3 of 4 on pyhpc01.

[stdout:2] 
Hello World! I am process 0 of 4 on pyhpc01.

[stdout:3] 
Hello World! I am process 1 of 4 on pyhpc01.


Communicators

  • processes can be collected into groups
  • each message is sent in a context, and must be received in the same context
  • a communicator encapsulates a context for a specific group
  • a given program may have many communicators with any level of overlap
  • two initial communicators

    • MPI_COMM_WORLD (all processes)
    • MPI_COMM_SELF (current process)

Datatypes

  • the data in a message to send or receive is described by address, count and datatype
  • a datatype is recursively defined as:

    • predefined, corresponding to a data type from the language (e.g., MPI_INT, MPI_DOUBLE)
    • a contiguous, strided block, or indexed array of blocks of MPI datatypes
    • an arbitrary structure of datatypes
  • there are MPI functions to construct custom datatypes


Tags

  • messages are sent with an accompanying user-defined integer tag to assist the receiving process in identifying the message
  • messages can be screened at the receiving end by specifying the expected tag, or not screened by using MPI_ANY_TAG

Functionality

  • There are hundreds of functions in the MPI standard, not all of them are necessarily available in MPI4Py, most commonly used are
  • No need to call MPI_Init() or MPI_Finalize()
    • MPI_Init() is called when you import the module
    • MPI_Finalize() is called before the Python process ends
  • To launch: mpirun --np < number of process > -machinefile < hostlist > python < my MPI4Py python script >
  • IPython automatically handles calling mpirun for you with the ipcluster command

MPI Basic (Blocking) Send

   int MPI_Send(void* buf, int count, MPI_Datatype type, 
   int dest, int tag, MPI_Comm comm)

Python (mpi4py)

  Comm.Send(self, buf, int dest=0, int tag=0)
  Comm.send(self, obj=None, int dest=0, int tag=0)

MPI Basic (Blocking) Recv

   int MPI_Recv(void* buf, int count, MPI_Datatype type, 
   int source, int tag, MPI_Comm comm, MPI_Status status)

Python (mpi4py)

  comm.Recv(self, buf, int source=0, int tag=0, 
  Status status=None)
  comm.recv(self, obj=None, int source=0, 
  int tag=0, Status status=None)

Send/Receive Example (lowercase convenience methods)


In [6]:
from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

if rank == 0:
   data = {'a': 7, 'b': 3.14}
   comm.send(data, dest=1, tag=11)
elif rank == 1:
   data = comm.recv(source=0, tag=11)
   print data


[stdout:3] {'a': 7, 'b': 3.14}

Send/Receive Example (MPI API on numpy)


In [7]:
from mpi4py import MPI
import numpy

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

# pass explicit MPI datatypes
if rank == 0:
   data = numpy.arange(1000, dtype='i')
   comm.Send([data, MPI.INT], dest=1, tag=77)
elif rank == 1:
   data = numpy.empty(1000, dtype='i')
   comm.Recv([data, MPI.INT], source=0, tag=77)

# or take advantage of automatic MPI datatype discovery
if rank == 0:
   data = numpy.arange(100, dtype=numpy.float64)
   comm.Send(data, dest=1, tag=13)
elif rank == 1:
   data = numpy.empty(100, dtype=numpy.float64)
   comm.Recv(data, source=0, tag=13)
   print data


[stdout:3] 
[  0.   1.   2.   3.   4.   5.   6.   7.   8.   9.  10.  11.  12.  13.  14.
  15.  16.  17.  18.  19.  20.  21.  22.  23.  24.  25.  26.  27.  28.  29.
  30.  31.  32.  33.  34.  35.  36.  37.  38.  39.  40.  41.  42.  43.  44.
  45.  46.  47.  48.  49.  50.  51.  52.  53.  54.  55.  56.  57.  58.  59.
  60.  61.  62.  63.  64.  65.  66.  67.  68.  69.  70.  71.  72.  73.  74.
  75.  76.  77.  78.  79.  80.  81.  82.  83.  84.  85.  86.  87.  88.  89.
  90.  91.  92.  93.  94.  95.  96.  97.  98.  99.]

Synchronization

   int MPI_Barrier(MPI_Comm comm)

Python (mpi4py)

   comm.Barrier(self)
   comm.barrier(self)

In [8]:
from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

for r_id in range(comm.Get_size()):
    if rank == r_id:
        print "Hello from proc:", rank
    comm.Barrier()


[stdout:0] Hello from proc: 2
[stdout:1] Hello from proc: 3
[stdout:2] Hello from proc: 0
[stdout:3] Hello from proc: 1

Timing and Profiling

the elapsed (wall-clock) time between two points in an MPI program can be computed using MPI_Wtime:


In [9]:
t1 = MPI.Wtime()
t2 = MPI.Wtime()
print("time elapsed is: %e\n" % (t2-t1))


[stdout:0] 
time elapsed is: 6.008148e-05

[stdout:1] 
time elapsed is: 8.392334e-05

[stdout:2] 
time elapsed is: 8.511543e-05

[stdout:3] 
time elapsed is: 6.198883e-05


Send/Receive Example (lowercase convenience methods)


Broadcast Example

   int MPI_Bcast(void *buf, int count, MPI_Datatype type, 
   int root, MPI_Comm comm)

Python (mpi4py)

  comm.Bcast(self, buf, int root=0)
  comm.bcast(self, obj=None, int root=0)

In [10]:
from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

if rank == 0:
   data = {'key1' : [7, 2.72, 2+3j],
           'key2' : ( 'abc', 'xyz')}
else:
   data = None
data = comm.bcast(data, root=0)
print "bcast finished and data \
 on rank %d is: "%comm.rank, data


[stdout:0] bcast finished and data  on rank 2 is:  {'key2': ('abc', 'xyz'), 'key1': [7, 2.72, (2+3j)]}
[stdout:1] bcast finished and data  on rank 3 is:  {'key2': ('abc', 'xyz'), 'key1': [7, 2.72, (2+3j)]}
[stdout:2] bcast finished and data  on rank 0 is:  {'key2': ('abc', 'xyz'), 'key1': [7, 2.72, (2+3j)]}
[stdout:3] bcast finished and data  on rank 1 is:  {'key2': ('abc', 'xyz'), 'key1': [7, 2.72, (2+3j)]}

Scatter Example:


In [11]:
from mpi4py import MPI

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

if rank == 0:
   data = [(i+1)**2 for i in range(size)]
else:
   data = None
data = comm.scatter(data, root=0)
assert data == (rank+1)**2
print "data on rank %d is: "%comm.rank, data


[stdout:0] data on rank 2 is:  9
[stdout:1] data on rank 3 is:  16
[stdout:2] data on rank 0 is:  1
[stdout:3] data on rank 1 is:  4

Gather (and Barrier) Example:


In [12]:
from mpi4py import MPI

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

data = (rank+1)**2
print "before gather, data on \
  rank %d is: "%rank, data

comm.Barrier()
data = comm.gather(data, root=0)
if rank == 0:
   for i in range(size):
       assert data[i] == (i+1)**2
else:
   assert data is None
print "data on rank: %d is: "%rank, data


[stdout:0] 
before gather, data on   rank 2 is:  9
data on rank: 2 is:  None
[stdout:1] 
before gather, data on   rank 3 is:  16
data on rank: 3 is:  None
[stdout:2] 
before gather, data on   rank 0 is:  1
data on rank: 0 is:  [1, 4, 9, 16]
[stdout:3] 
before gather, data on   rank 1 is:  4
data on rank: 1 is:  None

Collective Example


Reduce Example:


In [13]:
from mpi4py import MPI
comm = MPI.COMM_WORLD

sendmsg = comm.rank

recvmsg1 = comm.reduce(sendmsg, op=MPI.SUM, root=0)

recvmsg2 = comm.allreduce(sendmsg)
print recvmsg2


[stdout:0] 6
[stdout:1] 6
[stdout:2] 6
[stdout:3] 6

Compute Pi Example


In [14]:
from mpi4py import MPI
import math

def compute_pi(n, start=0, step=1):
    h = 1.0 / n
    s = 0.0
    for i in range(start, n, step):
        x = h * (i + 0.5)
        s += 4.0 / (1.0 + x**2)
    return s * h

comm = MPI.COMM_WORLD
nprocs = comm.Get_size()
myrank = comm.Get_rank()
if myrank == 0:
    n = 10
else:
    n = None

n = comm.bcast(n, root=0)

mypi = compute_pi(n, myrank, nprocs)

pi = comm.reduce(mypi, op=MPI.SUM, root=0)

if myrank == 0:
    error = abs(pi - math.pi)
    print ("pi is approximately %.16f, error is %.16f" % (pi, error))


[stdout:2] pi is approximately 3.1424259850010983, error is 0.0008333314113051

Mandelbrot Set Example


In [15]:
def mandelbrot (x, y, maxit):
    c = x + y*1j
    z = 0 + 0j
    it = 0
    while abs(z) < 2 and it < maxit:
        z = z**2 + c
        it += 1
    return it

x1, x2 = -2.0, 1.0
y1, y2 = -1.0, 1.0
w, h = 150, 100
maxit = 127

from mpi4py import MPI
import numpy
  
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

# number of rows to compute here
N = h // size + (h % size > rank)

# first row to compute here
start = comm.scan(N)-N

# array to store local result
Cl = numpy.zeros([N, w], dtype='i')

# compute owned rows

dx = (x2 - x1) / w
dy = (y2 - y1) / h

for i in range(N):
    y = y1 + (i + start) * dy
    for j in range(w):
        x = x1 + j * dx
        Cl[i, j] = mandelbrot(x, y, maxit)

# gather results at root (process 0)
counts = comm.gather(N, root=0)
C = None
if rank == 0:
    C = numpy.zeros([h, w], dtype='i')

rowtype = MPI.INT.Create_contiguous(w)
rowtype.Commit()

comm.Gatherv(sendbuf=[Cl, MPI.INT], recvbuf=[C, (counts, None), rowtype],root=0)

rowtype.Free()

We now need to "pull" the C array for plotting so we disable autopx. Make sure to re-enable it later on


In [16]:
%autopx


%autopx disabled

In [17]:
# CC is an array of C from all ranks, so we use CC[0]
CC = view['C']
ranks = view['rank']

# Do the plotting
from matplotlib import pyplot
# Some magic to get to MPI4PY rank 0, not necessarily engine id 0
pyplot.imshow(CC[ranks.index(0)], aspect='equal')
pyplot.spectral()
pyplot.show()


Toggle autopx back


In [18]:
%autopx


%autopx enabled

Advanced Capabilities

Other features supported by mpi4py

  • dynamic process spawning: Spawn(), Connect() and Disconnect()
  • one sided communication: Put(), Get(), Accumulate()
  • MPI-IO: Open(), Close(), Get_view() and Set_view()

More Comprehensive mpi4py Tutorials

Interesting Scalable Applications and Tools

Addressing the Import Problem

--> See Appendix 01 References for more resources